/*
 * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
 * The software in this package is published under the terms of the CPAL v1.0
 * license, a copy of which has been included with this distribution in the
 * LICENSE.txt file.
 */
package org.mule.compatibility.core.transport;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mule.compatibility.core.registry.MuleRegistryTransportHelper.registerConnector;

import org.mule.compatibility.core.api.endpoint.OutboundEndpoint;
import org.mule.compatibility.core.api.transport.MessageDispatcher;
import org.mule.runtime.core.api.config.ThreadingProfile;
import org.mule.runtime.core.config.ImmutableThreadingProfile;
import org.mule.runtime.core.util.concurrent.Latch;
import org.mule.tck.junit4.AbstractMuleContextEndpointTestCase;
import org.mule.tck.testmodels.mule.TestConnector;

import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.junit.Test;

public class DispatcherPoolTestCase extends AbstractMuleContextEndpointTestCase {

  @Test
  public void testDefaultDispatcherPoolConfiguration() throws Exception {
    final TestConnector connector = createConnectorWithSingleObjectDispatcherPool(ThreadingProfile.WHEN_EXHAUSTED_RUN);

    // ThreadingProfile exhausted action default is RUN
    assertEquals(ThreadingProfile.WHEN_EXHAUSTED_RUN, connector.getDispatcherThreadingProfile().getPoolExhaustedAction());
    assertEquals(2, connector.dispatchers.getMaxActive());
    // This must equal maxActive dispatchers because low maxIdle would result in
    // a lot of dispatcher churn
    assertEquals(2, connector.dispatchers.getMaxIdle());
    assertEquals(GenericKeyedObjectPool.WHEN_EXHAUSTED_BLOCK, connector.dispatchers.getWhenExhaustedAction());
    assertEquals(-1, connector.dispatchers.getMaxWait());
  }

  @Test
  public void testDefaultDispatcherPoolConfigurationThreadingProfileWait() throws Exception {
    final TestConnector connector = createConnectorWithSingleObjectDispatcherPool(ThreadingProfile.WHEN_EXHAUSTED_WAIT);

    assertEquals(ThreadingProfile.WHEN_EXHAUSTED_WAIT, connector.getDispatcherThreadingProfile().getPoolExhaustedAction());
    assertEquals(1, connector.dispatchers.getMaxActive());
    assertEquals(1, connector.dispatchers.getMaxIdle());
    assertEquals(GenericKeyedObjectPool.WHEN_EXHAUSTED_BLOCK, connector.dispatchers.getWhenExhaustedAction());
    assertEquals(-1, connector.dispatchers.getMaxWait());
  }

  @Test
  public void testDispatcherPoolDefaultBlockExhaustedAction() throws Exception {
    final TestConnector connector = createConnectorWithSingleObjectDispatcherPool(ThreadingProfile.WHEN_EXHAUSTED_WAIT);
    connector.setDispatcherPoolMaxWait(100);

    assertEquals(1, connector.dispatchers.getMaxActive());
    assertEquals(100, connector.dispatchers.getMaxWait());

    final OutboundEndpoint endpoint = getTestOutboundEndpoint("test", "test://test");

    Latch dispatcherBorrowedLatch = new Latch();
    Latch assertedLatch = new Latch();

    new Thread(() -> {
      try {
        MessageDispatcher messageDispatcher = (MessageDispatcher) connector.dispatchers.borrowObject(endpoint);
        dispatcherBorrowedLatch.countDown();
        assertedLatch.await();
        connector.dispatchers.returnObject(endpoint, messageDispatcher);
      } catch (Exception e) {
        e.printStackTrace();
      }

    }).start();
    dispatcherBorrowedLatch.await();
    assertEquals(1, connector.dispatchers.getNumActive());
    assertedLatch.countDown();
    connector.dispatchers.borrowObject(endpoint);
    assertEquals(1, connector.dispatchers.getNumActive());

  }

  @Test
  public void testDispatcherPoolBlockTimeoutExhaustedAction() throws Exception {
    final TestConnector connector = createConnectorWithSingleObjectDispatcherPool(ThreadingProfile.WHEN_EXHAUSTED_WAIT);
    connector.setDispatcherPoolMaxWait(10);

    assertEquals(1, connector.dispatchers.getMaxActive());
    assertEquals(10, connector.dispatchers.getMaxWait());

    final OutboundEndpoint endpoint = getTestOutboundEndpoint("test", "test://test");

    Latch dispatcherBorrowedLatch = new Latch();
    Latch assertedLatch = new Latch();

    new Thread(() -> {
      try {
        MessageDispatcher messageDispatcher = (MessageDispatcher) connector.dispatchers.borrowObject(endpoint);
        dispatcherBorrowedLatch.countDown();
        assertedLatch.await();
        connector.dispatchers.returnObject(endpoint, messageDispatcher);
      } catch (Exception e) {
        e.printStackTrace();
      }

    }).start();
    dispatcherBorrowedLatch.await();
    assertEquals(1, connector.dispatchers.getNumActive());
    try {
      connector.dispatchers.borrowObject(endpoint);
      fail("Exception expected");
    } catch (Exception e) {
      assertEquals(1, connector.dispatchers.getNumActive());
    }
    assertedLatch.countDown();
  }

  @Test
  public void testDispatcherPoolGrowExhaustedAction() throws Exception {
    final TestConnector connector = createConnectorWithSingleObjectDispatcherPool(ThreadingProfile.WHEN_EXHAUSTED_WAIT);
    connector.setDispatcherPoolWhenExhaustedAction(GenericKeyedObjectPool.WHEN_EXHAUSTED_GROW);

    assertEquals(1, connector.dispatchers.getMaxActive());

    final OutboundEndpoint endpoint = getTestOutboundEndpoint("test", "test://test");

    connector.dispatchers.borrowObject(endpoint);
    connector.dispatchers.borrowObject(endpoint);
    assertEquals(2, connector.dispatchers.getNumActive());

  }

  @Test
  public void testDispatcherPoolFailExhaustedAction() throws Exception {
    final TestConnector connector = createConnectorWithSingleObjectDispatcherPool(ThreadingProfile.WHEN_EXHAUSTED_WAIT);
    connector.setDispatcherPoolWhenExhaustedAction(GenericKeyedObjectPool.WHEN_EXHAUSTED_FAIL);

    assertEquals(1, connector.dispatchers.getMaxActive());

    final OutboundEndpoint endpoint = getTestOutboundEndpoint("test", "test://test");

    connector.dispatchers.borrowObject(endpoint);
    try {
      connector.dispatchers.borrowObject(endpoint);
      fail("Exception expected");
    } catch (Exception e) {
      assertEquals(1, connector.dispatchers.getNumActive());
    }
  }

  private TestConnector createConnectorWithSingleObjectDispatcherPool(int exhaustedAction) throws Exception {
    TestConnector connector = new TestConnector(muleContext);
    ThreadingProfile threadingProfile = new ImmutableThreadingProfile(1, 1, 1, 1, 1, exhaustedAction, true, null, null);
    connector.setDispatcherThreadingProfile(threadingProfile);
    connector.createReceiver(getTestFlow(), getTestInboundEndpoint("test", "test://test"));
    registerConnector(muleContext.getRegistry(), connector);
    return connector;
  }

}
